1 /** 2 * IO related functions 3 */ 4 5 module unit_threaded.io; 6 7 import unit_threaded.from; 8 9 /** 10 * Write if debug output was enabled. 11 */ 12 void writelnUt(T...)(auto ref T args) { 13 debug { 14 import unit_threaded.testcase : TestCase; 15 16 if (isDebugOutputEnabled) 17 TestCase.currentTest.getWriter.writeln(args); 18 } 19 } 20 21 private shared(bool) _debugOutput = false; ///print debug msgs? 22 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway? 23 bool _useEscCodes; 24 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"]; 25 26 static this() { 27 version (Posix) { 28 import std.stdio : stdout; 29 import core.sys.posix.unistd : isatty; 30 31 _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0; 32 } 33 } 34 35 package void enableDebugOutput(bool value = true) nothrow { 36 synchronized { 37 _debugOutput = value; 38 } 39 } 40 41 package bool isDebugOutputEnabled() nothrow @trusted { 42 synchronized { 43 return _debugOutput; 44 } 45 } 46 47 package void forceEscCodes() nothrow { 48 synchronized { 49 _forceEscCodes = true; 50 } 51 } 52 53 interface Output { 54 void send(in string output) @safe; 55 void flush() @safe; 56 } 57 58 private enum Colour { 59 red, 60 green, 61 yellow, 62 cancel, 63 } 64 65 private string colour(alias C)(in string msg) { 66 return escCode(C) ~ msg ~ escCode(Colour.cancel); 67 } 68 69 private alias green = colour!(Colour.green); 70 private alias red = colour!(Colour.red); 71 private alias yellow = colour!(Colour.yellow); 72 73 /** 74 * Send escape code to the console 75 */ 76 private string escCode(in Colour code) @safe { 77 return _useEscCodes ? _escCodes[code] : ""; 78 } 79 80 /** 81 * Writes the args in a thread-safe manner. 82 */ 83 void write(T...)(Output output, auto ref T args) { 84 import std.conv : text; 85 86 output.send(text(args)); 87 } 88 89 /** 90 * Writes the args in a thread-safe manner and appends a newline. 91 */ 92 void writeln(T...)(Output output, auto ref T args) { 93 write(output, args, "\n"); 94 } 95 96 /** 97 * Writes the args in a thread-safe manner in green (POSIX only). 98 * and appends a newline. 99 */ 100 void writelnGreen(T...)(Output output, auto ref T args) { 101 import std.conv : text; 102 103 output.send(green(text(args) ~ "\n")); 104 } 105 106 /** 107 * Writes the args in a thread-safe manner in red (POSIX only) 108 * and appends a newline. 109 */ 110 void writelnRed(T...)(Output output, auto ref T args) { 111 writeRed(output, args, "\n"); 112 } 113 114 /** 115 * Writes the args in a thread-safe manner in red (POSIX only). 116 * and appends a newline. 117 */ 118 void writeRed(T...)(Output output, auto ref T args) { 119 import std.conv : text; 120 121 output.send(red(text(args))); 122 } 123 124 /** 125 * Writes the args in a thread-safe manner in yellow (POSIX only). 126 * and appends a newline. 127 */ 128 void writeYellow(T...)(Output output, auto ref T args) { 129 import std.conv : text; 130 131 output.send(yellow(text(args))); 132 } 133 134 /** 135 * Thread to output to stdout 136 */ 137 class WriterThread : Output { 138 139 import std.concurrency : Tid; 140 141 /** 142 * Returns a reference to the only instance of this class. 143 */ 144 static WriterThread get() @trusted { 145 import std.concurrency : initOnce; 146 147 static __gshared WriterThread instance; 148 return initOnce!instance(new WriterThread); 149 } 150 151 override void send(in string output) @safe { 152 153 version (unitUnthreaded) { 154 import std.stdio : write; 155 156 write(output); 157 } else { 158 import std.concurrency : send, thisTid; 159 160 () @trusted{ _tid.send(output, thisTid); }(); 161 } 162 } 163 164 override void flush() @safe { 165 version (unitUnthreaded) { 166 } else { 167 import std.concurrency : send, thisTid; 168 169 () @trusted{ _tid.send(Flush(), thisTid); }(); 170 } 171 } 172 173 private: 174 175 this() { 176 version (unitUnthreaded) { 177 } else { 178 import std.concurrency : spawn, thisTid, receiveOnly, send; 179 import std.stdio : stdout, stderr; 180 181 _tid = spawn(&threadWriter!(stdout, stderr), thisTid); 182 _tid.send(ThreadWait()); 183 receiveOnly!ThreadStarted; 184 } 185 } 186 187 Tid _tid; 188 } 189 190 struct ThreadWait { 191 }; 192 struct ThreadFinish { 193 }; 194 struct ThreadStarted { 195 }; 196 struct ThreadEnded { 197 }; 198 struct Flush { 199 }; 200 201 version (Posix) { 202 enum nullFileName = "/dev/null"; 203 } else { 204 enum nullFileName = "NUL"; 205 } 206 207 void threadWriter(alias OUT, alias ERR)(from!"std.concurrency".Tid tid) { 208 import std.concurrency : receive, send, OwnerTerminated, Tid; 209 210 auto done = false; 211 212 auto saveStdout = OUT; 213 auto saveStderr = ERR; 214 215 void restore() { 216 saveStdout.flush(); 217 OUT = saveStdout; 218 ERR = saveStderr; 219 } 220 221 scope (failure) 222 restore; 223 224 if (!isDebugOutputEnabled()) { 225 OUT = typeof(OUT)(nullFileName, "w"); 226 ERR = typeof(ERR)(nullFileName, "w"); 227 } 228 229 void actuallyPrint(in string msg) { 230 if (msg.length) 231 saveStdout.write(msg); 232 } 233 234 // the first thread to send output becomes the current 235 // until that thread sends a Flush message no other thread 236 // can print to stdout, so we store their outputs in the meanwhile 237 static struct ThreadOutput { 238 string currentOutput; 239 string[] outputs; 240 241 void store(in string msg) { 242 currentOutput ~= msg; 243 } 244 245 void flush() { 246 outputs ~= currentOutput; 247 currentOutput = ""; 248 } 249 } 250 251 ThreadOutput[Tid] outputs; 252 253 Tid currentTid; 254 255 while (!done) { 256 receive((string msg, Tid originTid) { 257 258 if (currentTid == currentTid.init) { 259 currentTid = originTid; 260 261 // it could be that this thread became the current thread but had output not yet printed 262 if (originTid in outputs) { 263 actuallyPrint(outputs[originTid].currentOutput); 264 outputs[originTid].currentOutput = ""; 265 } 266 } 267 268 if (currentTid == originTid) 269 actuallyPrint(msg); 270 else { 271 if (originTid !in outputs) 272 outputs[originTid] = typeof(outputs[originTid]).init; 273 outputs[originTid].store(msg); 274 } 275 }, (ThreadWait w) { tid.send(ThreadStarted()); }, (ThreadFinish f) { 276 done = true; 277 }, (Flush f, Tid originTid) { 278 279 if (originTid in outputs) 280 outputs[originTid].flush; 281 282 if (currentTid != currentTid.init && currentTid != originTid) 283 return; 284 285 foreach (tid, ref threadOutput; outputs) { 286 foreach (o; threadOutput.outputs) 287 actuallyPrint(o); 288 threadOutput.outputs = []; 289 } 290 291 currentTid = currentTid.init; 292 }, (OwnerTerminated trm) { done = true; }); 293 } 294 295 restore; 296 tid.send(ThreadEnded()); 297 }